/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.dolphinscheduler.server.master.runner;

import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.zk.ZKMasterClient;
import org.apache.dolphinscheduler.service.process.ProcessService;

import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;

import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import javax.annotation.PostConstruct;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
 *  master scheduler thread
 */
@Service
public class MasterSchedulerService extends Thread {

    /**
     * logger of MasterSchedulerThread
     */
    private static final Logger logger = LoggerFactory.getLogger(MasterSchedulerService.class);

    /**
     * dolphinscheduler database interface
     * 处理服务的接口
     */
    @Autowired
    private ProcessService processService;

    /**
     * zookeeper master client
     */
    @Autowired
    private ZKMasterClient zkMasterClient;

    /**
     * master config
     */
    @Autowired
    private MasterConfig masterConfig;

    /**
     *  netty remoting client
     */
    private NettyRemotingClient nettyRemotingClient;

    /**
     * master exec service
     */
    private ThreadPoolExecutor masterExecService;


    /**
     * constructor of MasterSchedulerThread
     */
    @PostConstruct
    public void init(){
        // 默认创建100个线程的线程池
        this.masterExecService = (ThreadPoolExecutor)ThreadUtils.newDaemonFixedThreadExecutor("Master-Exec-Thread", masterConfig.getMasterExecThreads());
        NettyClientConfig clientConfig = new NettyClientConfig();
        // 远程客户端启动
        this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
    }

    @Override
    public void start(){
        super.setName("MasterSchedulerThread");
        super.start(); //启动当前线程 (当前类的run方法)
    }

    public void close() {
        masterExecService.shutdown();
        boolean terminated = false;
        try {
            terminated = masterExecService.awaitTermination(5, TimeUnit.SECONDS);
        } catch (InterruptedException ignore) {}
        if(!terminated){
            logger.warn("masterExecService shutdown without terminated, increase await time");
        }
        nettyRemotingClient.close();
        logger.info("master schedule service stopped...");
    }

    /**
     * run of MasterSchedulerThread
     */
    @Override
    public void run() {
        logger.info("master scheduler started");
        // 死循环调用
        while (Stopper.isRunning()){
            try {
                /**
                 * 代码很关键 判断cpu和内存负载,如果负载过大,调度是不执行的
                 * 如果没配置:getMasterMaxCpuloadAvg:Runtime.getRuntime().availableProcessors()*2
                 *           getMasterReservedMemory:0.3
                 */
                boolean runCheckFlag = OSUtils.checkResource(masterConfig.getMasterMaxCpuloadAvg(), masterConfig.getMasterReservedMemory());
                if(!runCheckFlag) {
                    Thread.sleep(Constants.SLEEP_TIME_MILLIS);//睡1s
                    // 如果资源紧张的情况下会导致下面的调度任务不执行,这里面写了continue
                    continue;
                }
                /**
                 * 判断zk的状态,如果是STARTED则进行调度
                 */
                if (zkMasterClient.getZkClient().getState() == CuratorFrameworkState.STARTED) {
                    /**调度进程(重要)*/
                    scheduleProcess();
                }
            } catch (Exception e) {
                logger.error("master scheduler thread error", e);
            }
        }
    }

    /**
     * master调度执行,死循环调用且使用了zookeeper的分布式锁
     * @throws Exception
     */
    private void scheduleProcess() throws Exception {
        InterProcessMutex mutex = null;
        try {       // 获取分布式锁
                    mutex = zkMasterClient.blockAcquireMutex();
                    // 获取活跃的线程数量
                    int activeCount = masterExecService.getActiveCount();
                    // make sure to scan and delete command  table in one transaction
                 /**
                  * 获取一个任务命令 t_ds_command
                  *    select command.* from t_ds_command command
                  *         join t_ds_process_definition definition on command.process_definition_id = definition.id
                  *         where definition.release_state = 1 AND definition.flag = 1
                  *         order by command.update_time asc
                  *         limit 1
                  */
                 Command command = processService.findOneCommand();
                    if (command != null) {
                        logger.info("find one command: id: {}, type: {}", command.getId(),command.getCommandType());
                        try{
                            /**流程实例 this.masterConfig.getMasterExecThreads()默认100,也就是执行任务的实例*/
                            ProcessInstance processInstance = processService.handleCommand(logger, getLocalAddress(),
                                    this.masterConfig.getMasterExecThreads() - activeCount, command);
                            if (processInstance != null) {
                                logger.info("start master exec thread , split DAG ...");
                                /**
                                 * 重要方法(调度任务的执行)
                                 */
                                masterExecService.execute(new MasterExecThread(processInstance, processService, nettyRemotingClient));
                            }
                        }catch (Exception e){
                            logger.error("scan command error ", e);
                            processService.moveToErrorCommand(command, e.toString());
                        }
                    } else{
                        //indicate that no command ,sleep for 1s
                        Thread.sleep(Constants.SLEEP_TIME_MILLIS);
                    }
            } finally{
                 //释放分布式锁
                zkMasterClient.releaseMutex(mutex);
            }
        }

    private String getLocalAddress() {
        return NetUtils.getAddr(masterConfig.getListenPort());
    }
}
